Project Description¶

In this project, we take on the role of a data analyst at an energy company. Previously, analysts from other departments had to manually collect, clean, and prepare data every quarter to track changes in electricity sales and generation capabilities. This manual process was time-consuming and often frustrating for the teams involved.

Our goal is to automate this workflow by creating a data pipeline that retrieves and processes data every month. This pipeline will make it easier and faster for the company to gain insights from the data and allow analysts to spend more time on important tasks like analysis and decision-making rather than data preparation.

We will work with two raw data files:

  • electricity_sales.csv
  • electricity_capability_nested.json

The electricity_sales.csv file contains detailed records of electricity sales. Below is a data dictionary that describes the structure of this CSV file:

Field Data Type
period str
stateid str
stateDescription str
sectorid str
sectorName str
price float
price-units str

We will write Python code to build and test this automated pipeline. It will include steps for reading, transforming, and cleaning both datasets. Once completed, the pipeline will help deliver regular, clean, and structured data to support quicker and more accurate energy insights across the organization.

Question No 1: Define an extract_tabular_data() function to ingest tabular data. This function will take a single parameter, file_path. If file_path ends with .csv, use the pd.read_csv() function to extract the data. If file_path ends with .parquet, use the pd.read_parquet() function to extract the data.¶

In [1]:
import pandas as pd
import json

def extract_tabular_data(file_path: str):
    """Extract data from a tabular file_format, with pandas."""
    if file_path.endswith(".csv"):
        return pd.read_csv(file_path)
    
    elif file_path.endswith(".parquet"):
        return pd.read_parquet(file_path)
    
    else:
        raise Exception("Warning: Invalid file extension. Please try with .csv or .parquet!")

Question No 2: Create another function with the name extract_json_data(), which takes a file_path. Use the json_normalize() function from the pandas library to flatten the nested JSON data, and return a pandas DataFrame.¶

In [2]:
def extract_json_data(file_path):
    """Extract and flatten data from a JSON file."""
    # First, read in the JSON file into memory using the json library
    with open(file_path, "r") as json_file:
        raw_data = json.load(json_file)
    
    
    return pd.json_normalize(raw_data)

Question No 3: Create a function called transform_electricity_sales_data() which takes a single parameter raw_data. raw_data should be of type pd.DataFrame. The transform_electricity_sales_data() needs to fullfil some requirements that are described below.¶

- Drop any records with NA values in the `price` column. Do this inplace.
- Only keep records with a `sectorName` of "residential" or "transportation".
- Create a `month` column using the first 4 characters of the values in `period`.
- Create a `year` column using the last 2 characters of the values in `period`.
- Return the transformed `DataFrame`, keeping only the columns `year`, `month`, `stateid`, `price` and `price-units`
In [3]:
def transform_electricity_sales_data(raw_data: pd.DataFrame):
    """
    Transform electricity sales to find the total amount of electricity sold
    in the residential and transportation sectors.
    """
    # Drop any records with a null value
    raw_data.dropna(subset=["price"], inplace=True)
    
    # Only keep residential and transformation records
    cleaned_df = raw_data.loc[raw_data["sectorName"].isin(["residential", "transportation"]), :]
    
    # Create year and month columns
    cleaned_df["year"] = cleaned_df["period"].str[0:4]
    cleaned_df["month"] = cleaned_df["period"].str[5:]
    
    # Only keep columns period, stateId, sector, value, units
    cleaned_df = cleaned_df.loc[:, ["year", "month", "stateid", "price", "price-units"]]
    
    return cleaned_df

Question No 4: Define a function called load(), which takes a DataFrame and a file_path. If the file_path ends with .csv, load the DataFrame to a CSV file. If instead the file_path ends with .parquet, load the DataFrame to a Parquet file.¶

In [11]:
def load(dataframe: pd.DataFrame, file_path: str):
    """
    Load a DataFrame to a file in either CSV or Parquet format.
    """
    # Check to see if the file path ends with .csv or .parquet
    if file_path.endswith(".csv"):
        dataframe.to_csv(file_path)
        
    elif file_path.endswith(".parquet"):
        dataframe.to_parquet(file_path)
    
    # Otherwise, throw an exception
    else: raise Exception(f"Warning: {file_path} is not a valid file type. Please try again!")